-
Notifications
You must be signed in to change notification settings - Fork 0
Clone kafka 18894 #28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughReplaces direct ConfigProvider usage with Plugin wrappers across config resolution and Connect runtime. Updates constructors, fields, and maps to hold plugins, adjusts transform and close flows to use plugin.get() and plugin-aware cleanup, extends Plugins.newConfigProvider to return wrapped plugins with metrics, and adds tests plus a monitorable provider. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor App as Caller
participant AC as AbstractConfig
participant P as Plugins
participant PL as Plugin<ConfigProvider>
participant CP as ConfigProvider
participant CT as ConfigTransformer
App->>AC: resolveConfigVariables(props)
AC->>P: newConfigProvider(config, providerName, usage, metrics)
P-->>AC: PL = Plugin.wrapInstance(CP, metrics, CONFIG_PROVIDERS_CONFIG, {provider})
AC->>CT: new ConfigTransformer(Map<String, PL>)
note over CT: Transformer holds Plugin-wrapped providers
App->>CT: transform(values)
CT->>PL: get()
PL-->>CT: CP
CT->>CP: get(path, keys)
CP-->>CT: ConfigData
CT-->>App: transformed values
App->>PL: close()
note over PL: Closed via plugin-aware cleanup
sequenceDiagram
autonumber
actor W as Worker/MirrorMaker
participant P as Plugins
participant PL as Plugin<ConfigProvider>
participant WCT as WorkerConfigTransformer
participant CT as ConfigTransformer
W->>P: newConfigProvider(config, providerName, usage, metrics)
P-->>W: PL
W->>WCT: new(..., Map<String, PL>)
WCT->>CT: new ConfigTransformer(Map<String, PL>)
note over WCT,CT: Transform flow uses plugin.get() to access provider
W->>WCT: close()
WCT->>PL: close() (Utils.closeQuietly, "config provider plugin")
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Poem
Pre-merge checks and finishing touches❌ Failed checks (3 warnings)
✨ Finishing touches
🧪 Generate unit tests
Warning There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure. 🔧 ast-grep (0.39.5)connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.javaThanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Summary of ChangesHello @DDShantanuBadmanji, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly refactors how Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request refactors ConfigProvider instantiation to use a Plugin wrapper, enabling metrics for config providers via the Monitorable interface. The changes are well-integrated across the client, connect-runtime, and mirror-maker modules, and include corresponding test updates. This is a good improvement for observability. I have a few minor suggestions to improve code clarity and robustness in a test utility.
| } | ||
| } | ||
| providers.values().forEach(x -> Utils.closeQuietly(x, "config provider")); | ||
| providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For better readability and maintainability, it's good practice to use more descriptive variable names in lambda expressions. Using plugin instead of x would make the code's intent clearer.
| providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); | |
| providerPlugins.values().forEach(plugin -> Utils.closeQuietly(plugin, "config provider plugin")); |
| public ConfigData get(String path) { | ||
| return null; | ||
| } | ||
|
|
||
| @Override | ||
| public ConfigData get(String path, Set<String> keys) { | ||
| return null; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The get methods return null, which can lead to a NullPointerException in callers like ConfigTransformer that don't expect a null ConfigData. To make this test utility more robust and align with the implicit contract of ConfigProvider, it's better to return an empty ConfigData object.
| public ConfigData get(String path) { | |
| return null; | |
| } | |
| @Override | |
| public ConfigData get(String path, Set<String> keys) { | |
| return null; | |
| } | |
| @Override | |
| public ConfigData get(String path) { | |
| return new ConfigData(java.util.Collections.emptyMap()); | |
| } | |
| @Override | |
| public ConfigData get(String path, Set<String> keys) { | |
| return new ConfigData(java.util.Collections.emptyMap()); | |
| } |
| ConfigTransformer transformer = new ConfigTransformer(providerPlugins); | ||
| Map<String, String> transformed = transformer.transform(props).data(); | ||
| providers.values().forEach(x -> Utils.closeQuietly(x, "config provider")); | ||
| providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For better readability and maintainability, it's good practice to use more descriptive variable names in lambda expressions. Using plugin instead of x would make the code's intent clearer.
| providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); | |
| providerPlugins.values().forEach(plugin -> Utils.closeQuietly(plugin, "config provider plugin")); |
| @Override | ||
| public void close() { | ||
| configProviders.values().forEach(x -> Utils.closeQuietly(x, "config provider")); | ||
| configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For better readability and maintainability, it's good practice to use more descriptive variable names in lambda expressions. Using plugin instead of x would make the code's intent clearer.
| configProviderPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); | |
| configProviderPlugins.values().forEach(plugin -> Utils.closeQuietly(plugin, "config provider plugin")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (1)
632-650: Change wrap order to before configuration
Wrap the provider withPlugin.wrapInstancebefore callingconfigure()so that Monitorable providers receive metrics during theirconfigure(...)call.
🧹 Nitpick comments (8)
clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java (1)
32-35: Clarify when metrics and tags are attached to providersTags are added only when the provider is wrapped via Plugins.newConfigProvider and a non-null Metrics is supplied; the class tag is the simple class name. Consider tightening the Javadoc to avoid implying unconditional tagging.
- * <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the config provider to register metrics. - * The following tags are automatically added to all metrics registered: <code>config</code> set to - * <code>config.providers</code>, <code>class</code> set to the ConfigProvider class name, - * and <code>provider</code> set to the provider name. + * <p>Implement {@link org.apache.kafka.common.metrics.Monitorable} to enable the config provider to register metrics + * when created via {@code Plugins.newConfigProvider(...)} with a non-null {@code Metrics}. + * The following tags are added to all metrics registered: <code>config</code> = <code>config.providers</code>, + * <code>class</code> = the provider's simple class name, and <code>provider</code> = the configured provider name.connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (1)
270-287: Avoid null entries in providerPlugins; filter before put and closenewConfigProvider may return null; skip adding those and you won’t rely on Utils.closeQuietly handling nulls. Keeps the map clean and intent clear.
- Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>(); + Map<String, Plugin<ConfigProvider>> providerPlugins = new HashMap<>(); for (String name : providerNames) { Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider( this, name, Plugins.ClassLoaderUsage.PLUGINS, null ); - providerPlugins.put(name, configProviderPlugin); + if (configProviderPlugin != null) { + providerPlugins.put(name, configProviderPlugin); + } } ConfigTransformer transformer = new ConfigTransformer(providerPlugins); Map<String, String> transformed = transformer.transform(props).data(); - providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); + providerPlugins.values().forEach(p -> Utils.closeQuietly(p, "config provider plugin")); return transformed;clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (1)
624-633: Add provider name as a Plugin metrics tag for better observabilityInclude the provider’s logical name in Plugin tags to aid metrics/log correlation.
- Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance(provider, null, CONFIG_PROVIDERS_CONFIG); + Plugin<ConfigProvider> providerPlugin = Plugin.wrapInstance( + provider, + null, + CONFIG_PROVIDERS_CONFIG, + Collections.singletonMap("name", entry.getKey()) + );connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java (1)
210-223: Clean up partially created provider plugins if initialization failsIf creating a later provider or constructing
WorkerConfigTransformerthrows, earlier plugins remain unclosed. Add failure-path cleanup.private WorkerConfigTransformer initConfigTransformer() { final List<String> providerNames = config.getList(WorkerConfig.CONFIG_PROVIDERS_CONFIG); - Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>(); - for (String providerName : providerNames) { - Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider( - config, - providerName, - ClassLoaderUsage.PLUGINS, - metrics.metrics() - ); - providerPluginMap.put(providerName, configProviderPlugin); - } - return new WorkerConfigTransformer(this, providerPluginMap); + Map<String, Plugin<ConfigProvider>> providerPluginMap = new HashMap<>(); + try { + for (String providerName : providerNames) { + Plugin<ConfigProvider> configProviderPlugin = plugins.newConfigProvider( + config, + providerName, + ClassLoaderUsage.PLUGINS, + metrics.metrics() + ); + providerPluginMap.put(providerName, configProviderPlugin); + } + return new WorkerConfigTransformer(this, providerPluginMap); + } catch (Throwable t) { + providerPluginMap.values().forEach(p -> Utils.closeQuietly(p, "config provider plugin")); + throw t; + } }connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (1)
23-30: Close the transformer in tests to exercise/verify plugin lifecycleAdd
@AfterEachto closeconfigTransformer; future-proofs tests if providers acquire resources.@@ -import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.AfterEach; @@ public void setup() { configTransformer = new WorkerConfigTransformer(worker, Map.of("test", Plugin.wrapInstance(new TestConfigProvider(), null, "config.providers"))); } + + @AfterEach + public void tearDown() { + if (configTransformer != null) { + configTransformer.close(); + } + }Also applies to: 67-71
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (1)
46-52: Plugin-wrapped providers: wiring looks correctConstructor and field migration to Map<String, Plugin> aligns with the Plugin API; passing the plugin map to ConfigTransformer is consistent with the PR direction. Consider guarding against external mutation by copying/wrapping the map.
Apply within this hunk:
- this.configProviderPlugins = configProviderPlugins; - this.configTransformer = new ConfigTransformer(configProviderPlugins); + this.configProviderPlugins = java.util.Collections.unmodifiableMap(new java.util.HashMap<>(configProviderPlugins)); + this.configTransformer = new ConfigTransformer(this.configProviderPlugins);Confirm ConfigTransformer’s constructor now accepts Map<String, Plugin> to avoid a cross-module signature mismatch.
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (1)
2904-2927: New test asserts provider metrics exposure — ensure init timingThe assertions read metrics immediately after Worker construction (without worker.start()). If Worker’s provider initialization ever moves to start(), this test could become flaky.
Options:
- Call worker.start() and worker.stop() in this test, or
- Add a small accessor/trigger that initializes provider plugins deterministically for tests.
Verify this test passes consistently across configurations (no reliance on side effects in the constructor).
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (1)
404-412: withPluginMetrics-after-configure ordering is validatedCreating a monitorable provider with Metrics and asserting instance type relies on overridden assertion for order — good. Consider also asserting at least one “plugins” metric exists to strengthen the test.
Example follow-up within this test:
- Inspect new Metrics instance for a MetricName with group "plugins" and class tag CustomMonitorableConfigProvider.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (12)
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java(4 hunks)clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java(3 hunks)clients/src/main/java/org/apache/kafka/common/config/provider/ConfigProvider.java(1 hunks)clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java(5 hunks)clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java(1 hunks)connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java(2 hunks)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java(1 hunks)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java(3 hunks)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java(3 hunks)connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java(5 hunks)connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java(5 hunks)connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java(4 hunks)
🧰 Additional context used
🧬 Code graph analysis (10)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala (1)
withPluginMetrics(6299-6301)
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java (1)
WorkerConfig(58-463)
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)clients/src/main/java/org/apache/kafka/common/utils/Utils.java (1)
Utils(93-1700)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (3)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
MonitorableConfigProvider(29-58)clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java (1)
Metrics(70-696)
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (2)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java (1)
Plugins(59-696)
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (1)
clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (3)
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
MonitorableConfigProvider(29-58)clients/src/main/java/org/apache/kafka/common/internals/Plugin.java (1)
Plugin(33-92)clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java (1)
Metrics(70-696)
🔇 Additional comments (10)
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorMakerConfig.java (1)
26-26: Import looks correct for Plugin usageUsing org.apache.kafka.common.internals.Plugin aligns with the wrapped-provider approach introduced in this PR.
clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java (1)
98-105: LGTM: correct use of plugin.get() for provider accessNull-safe access and delegation to ConfigProvider#get(path, keys) via the wrapper looks good.
clients/src/test/java/org/apache/kafka/common/config/ConfigTransformerTest.java (1)
49-55: Tests updated appropriately for Plugin-wrapped providersWrapping TestConfigProvider with Plugin and switching to Map.of() reads clean and validates the new ConfigTransformer contract.
Also applies to: 63-68, 72-75, 79-82, 86-89, 93-98, 102-105
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfigTransformer.java (1)
101-101: Close the plugin, not just the provider — goodClosing the Plugin ensures both provider and associated plugin metrics are torn down. LGTM.
clients/src/test/java/org/apache/kafka/common/config/provider/MonitorableConfigProvider.java (1)
35-38: Validates PluginMetrics hookupUsing PluginMetrics.metricName and registering a measurable is appropriate for exercising plugin-scoped metrics in tests. LGTM.
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java (3)
382-389: Mocks return Plugin correctlyWrapping MockFileConfigProvider with Plugin and tagging by provider=file matches the new API. Passing null Metrics intentionally avoids metric binding for this mock. LGTM.
Please confirm the intent is to skip plugin metrics for the file provider in these tests (null metrics).
2930-2935: Expected tag map helper — clear and reusableHelper builds stable tag set for plugin metrics; matches Plugin.tags key/class plus extra. LGTM.
2937-2951: Metric assertions are preciseFiltering by group "plugins" and verifying name/description and exact tags gives strong coverage of PluginMetrics wiring. LGTM.
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/PluginsTest.java (2)
381-402: Adaptation to Plugin APIUsing providerName and asserting on plugin.get() class while keeping classloader checks is correct. LGTM.
810-816: Custom monitorable provider for ordering checkOverriding withPluginMetrics to assert configured=true neatly validates configure-then-metrics sequence. LGTM.
| Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter); | ||
|
|
||
| if (!providers.isEmpty()) { | ||
| ConfigTransformer configTransformer = new ConfigTransformer(providers); | ||
| if (!providerPlugins.isEmpty()) { | ||
| ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins); | ||
| ConfigTransformerResult result = configTransformer.transform(indirectVariables); | ||
| if (!result.data().isEmpty()) { | ||
| resolvedOriginals.putAll(result.data()); | ||
| } | ||
| } | ||
| providers.values().forEach(x -> Utils.closeQuietly(x, "config provider")); | ||
| providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Always close provider plugins on exceptions during variable resolution
If ConfigTransformer.transform(...) throws, providers aren’t closed, leaking resources. Wrap usage in try/finally and close in the finally block.
- Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
-
- if (!providerPlugins.isEmpty()) {
- ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
- ConfigTransformerResult result = configTransformer.transform(indirectVariables);
- if (!result.data().isEmpty()) {
- resolvedOriginals.putAll(result.data());
- }
- }
- providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
+ Map<String, Plugin<ConfigProvider>> providerPlugins =
+ instantiateConfigProviders(providerConfigString, configProperties, classNameFilter);
+ try {
+ if (!providerPlugins.isEmpty()) {
+ ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins);
+ ConfigTransformerResult result = configTransformer.transform(indirectVariables);
+ if (!result.data().isEmpty()) {
+ resolvedOriginals.putAll(result.data());
+ }
+ }
+ } finally {
+ providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin"));
+ }📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| Map<String, Plugin<ConfigProvider>> providerPlugins = instantiateConfigProviders(providerConfigString, configProperties, classNameFilter); | |
| if (!providers.isEmpty()) { | |
| ConfigTransformer configTransformer = new ConfigTransformer(providers); | |
| if (!providerPlugins.isEmpty()) { | |
| ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins); | |
| ConfigTransformerResult result = configTransformer.transform(indirectVariables); | |
| if (!result.data().isEmpty()) { | |
| resolvedOriginals.putAll(result.data()); | |
| } | |
| } | |
| providers.values().forEach(x -> Utils.closeQuietly(x, "config provider")); | |
| providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); | |
| Map<String, Plugin<ConfigProvider>> providerPlugins = | |
| instantiateConfigProviders(providerConfigString, configProperties, classNameFilter); | |
| try { | |
| if (!providerPlugins.isEmpty()) { | |
| ConfigTransformer configTransformer = new ConfigTransformer(providerPlugins); | |
| ConfigTransformerResult result = configTransformer.transform(indirectVariables); | |
| if (!result.data().isEmpty()) { | |
| resolvedOriginals.putAll(result.data()); | |
| } | |
| } | |
| } finally { | |
| providerPlugins.values().forEach(x -> Utils.closeQuietly(x, "config provider plugin")); | |
| } |
🤖 Prompt for AI Agents
In clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
around lines 550 to 560, instantiateConfigProviders is called and then
ConfigTransformer.transform(...) is invoked without guaranteeing provider
plugins are closed on exceptions; to fix, move the providerPlugins usage into a
try/finally: keep providerPlugins created before the try, call new
ConfigTransformer(providerPlugins) and transform(...) inside the try, apply
resolvedOriginals.putAll(...) only if result.data() is non-empty, and in the
finally block always iterate providerPlugins.values() and
Utils.closeQuietly(...) each to ensure plugins are closed even if transform
throws.
| * @param configProviderPlugins a Map of provider names and {@link ConfigProvider} instances. | ||
| */ | ||
| public ConfigTransformer(Map<String, ConfigProvider> configProviders) { | ||
| this.configProviders = configProviders; | ||
| public ConfigTransformer(Map<String, Plugin<ConfigProvider>> configProviderPlugins) { | ||
| this.configProviderPlugins = configProviderPlugins; | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix constructor @param Javadoc to reflect Plugin-wrapped providers
The parameter now accepts Map<String, Plugin>; the doc still mentions raw ConfigProvider instances.
- * @param configProviderPlugins a Map of provider names and {@link ConfigProvider} instances.
+ * @param configProviderPlugins a Map of provider names to Plugin-wrapped {@link ConfigProvider} instances.📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| * @param configProviderPlugins a Map of provider names and {@link ConfigProvider} instances. | |
| */ | |
| public ConfigTransformer(Map<String, ConfigProvider> configProviders) { | |
| this.configProviders = configProviders; | |
| public ConfigTransformer(Map<String, Plugin<ConfigProvider>> configProviderPlugins) { | |
| this.configProviderPlugins = configProviderPlugins; | |
| } | |
| * @param configProviderPlugins a Map of provider names to Plugin-wrapped {@link ConfigProvider} instances. | |
| */ | |
| public ConfigTransformer(Map<String, Plugin<ConfigProvider>> configProviderPlugins) { | |
| this.configProviderPlugins = configProviderPlugins; | |
| } |
🤖 Prompt for AI Agents
In clients/src/main/java/org/apache/kafka/common/config/ConfigTransformer.java
around lines 65 to 69, the constructor Javadoc @param incorrectly says the map
contains raw ConfigProvider instances; update the @param description to state it
accepts a Map of provider names to Plugin-wrapped ConfigProvider instances
(Map<String, Plugin<ConfigProvider>>), briefly mentioning that each value is a
Plugin<ConfigProvider> rather than a direct ConfigProvider to match the
constructor signature.
Delete this text and replace it with a detailed description of your change. The
PR title and body will become the squashed commit message.
If you would like to tag individuals, add some commentary, upload images, or
include other supplemental information that should not be part of the eventual
commit message, please use a separate comment.
If applicable, please include a summary of the testing strategy (including
rationale) for the proposed change. Unit and/or integration tests are expected
for any behavior change and system tests should be considered for larger
changes.
Summary by CodeRabbit
New Features
Documentation
Refactor
Tests